

CREATE TABLE tbl_data_employee (
    emp_id INT,
    name VARCHAR,
    dept_id INT
) WITH (
    'connector' = 'filesystem',
    'path' = '/Users/a58/xiyong.lxy/tmp/flink/table/employee_information.json',
    'format' = 'json'
);


select * from tbl_data_employee;

CREATE TABLE kafka_table (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    'topic' = '<your-topic>',
    'properties.bootstrap.servers' = '<your-broker-list>',
    'format.type' = 'csv', -- 指定数据格式为CSV
    'scan.startup.mode' = 'earliest-offset', -- 从最早的消息开始读取
    'format.field-delimiter' = ',', -- 设置字段之间的分隔符为逗号（可根据需求修改）
    'format.ignore-parse-errors' = 'true' -- 忽略无法解析的错误记录
);
